package com.example.dobs.demo.flink.realtime.report.study.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWC {
    public static void main(String[] args) throws Exception {
        int port ;
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        if (parameterTool.has("port"))
            port = parameterTool.getInt("port");
        else
            port = 9000;
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> textStream = environment.socketTextStream("localhost", port, "\n");

        SingleOutputStreamOperator<WC> singleOutputStreamOperator = textStream.flatMap(new FlatMapFunction<String, WC>() {
            public void flatMap(String s, Collector<WC> collector) throws Exception {
                String[] splits = s.split("\\s");
                for (String word : splits) {
                    collector.collect(new WC(word, 1L));
                }
            }
        }).keyBy("word")
                .timeWindow(Time.seconds(2), Time.seconds(1))
                .sum("count");
        singleOutputStreamOperator.print().setParallelism(1);
        environment.execute("SocketWindowWC");
    }


    public static class WC{
        public String word;
        public long count;
        public WC(){}
        public WC(String word,long count){
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WC{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}
